Skip to content

GH-49669: [Python] Expose the experimental async device API through Python#49670

Draft
CurtHagenlocher wants to merge 3 commits intoapache:mainfrom
CurtHagenlocher:PythonAsyncDevice
Draft

GH-49669: [Python] Expose the experimental async device API through Python#49670
CurtHagenlocher wants to merge 3 commits intoapache:mainfrom
CurtHagenlocher:PythonAsyncDevice

Conversation

@CurtHagenlocher
Copy link
Copy Markdown
Contributor

@CurtHagenlocher CurtHagenlocher commented Apr 6, 2026

Rationale for this change

Allow an async device stream to be consumed from within Python.

What changes are included in this PR?

Code and tests to allow an async device stream to be consumed from within Python.

Are these changes tested?

Yes.

Are there any user-facing changes?

There's a new API to produce an async device stream.

DISCLOSURE: this change was heavily supported by AI. I understand the code and made some changes to it, but would likely not have put in the effort to understand how to make the change without AI help.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 6, 2026

⚠️ GitHub issue #49669 has been automatically assigned in GitHub to PR creator.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds initial Python bindings and tests for consuming Arrow’s experimental async device stream interface via an AsyncRecordBatchReader async-iterator.

Changes:

  • Introduces a new AsyncRecordBatchReader Cython type with async for support and schema access.
  • Adds a C++ test-only roundtrip helper to wire a producer and consumer via ArrowAsyncDeviceStreamHandler.
  • Adds Python tests validating async iteration, empty streams, schema, context manager usage, and backpressure via small queue sizing.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
python/pyarrow/tests/test_ipc_async.py Adds unit tests for the new async reader behavior.
python/pyarrow/src/arrow/python/async_stream.h Adds C++ helper functions to bridge async generators and implement a roundtrip producer/consumer for tests.
python/pyarrow/lib.pyx Includes the new ipc_async.pxi bindings in the extension build.
python/pyarrow/lib.pxd Declares the new AsyncRecordBatchReader Cython type.
python/pyarrow/ipc_async.pxi Implements AsyncRecordBatchReader and a private async roundtrip test helper.
python/pyarrow/includes/libarrow.pxd Adds Cython declarations for async stream ABI / bridge entrypoints.
python/pyarrow/init.py Exports AsyncRecordBatchReader at the top-level pyarrow namespace.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +48 to +69
inline Future<AsyncRecordBatchGenerator> RoundtripAsyncBatches(
std::shared_ptr<Schema> schema, std::vector<std::shared_ptr<RecordBatch>> batches,
::arrow::internal::Executor* executor, uint64_t queue_size = 5) {
// Heap-allocate the handler so it outlives this function.
auto* handler = new ArrowAsyncDeviceStreamHandler;
std::memset(handler, 0, sizeof(ArrowAsyncDeviceStreamHandler));

auto fut_gen = CreateAsyncDeviceStreamHandler(handler, executor, queue_size);

// Submit the export to the executor so it runs concurrently with the consumer.
auto submit_result = executor->Submit(
[schema = std::move(schema), batches = std::move(batches), handler]() mutable {
auto generator = MakeVectorGenerator(std::move(batches));
return ExportAsyncRecordBatchReader(std::move(schema), std::move(generator),
DeviceAllocationType::kCPU, handler);
});

if (!submit_result.ok()) {
return Future<AsyncRecordBatchGenerator>::MakeFinished(submit_result.status());
}

return fut_gen;
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RoundtripAsyncBatches heap-allocates ArrowAsyncDeviceStreamHandler but never deletes it. CreateAsyncDeviceStreamHandler sets handler->release to only delete private_data (see cpp/src/arrow/c/bridge.cc:2696-2698), and ExportAsyncRecordBatchReader calls handler->release(handler) but does not free the handler itself, so this leaks per roundtrip and can accumulate in test runs. Manage handler ownership explicitly (e.g., capture the Future returned by executor->Submit, attach a continuation that deletes the handler after ExportAsyncRecordBatchReader completes, and ensure the submit-failure path also cleans up and marks the consumer future finished).

Copilot uses AI. Check for mistakes.
Comment on lines +75 to +76
f"Do not call {self.__class__.__name__}'s constructor directly, "
"use factory methods instead.")
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor error message instructs users to "use factory methods instead", but no public factory is introduced for AsyncRecordBatchReader in this PR (only the private test helper _test_roundtrip_async). Either provide/mention the actual public factory in the message, or adjust the message to avoid pointing to APIs that don't exist.

Suggested change
f"Do not call {self.__class__.__name__}'s constructor directly, "
"use factory methods instead.")
f"Do not call {self.__class__.__name__}'s constructor directly; "
f"{self.__class__.__name__} instances are created by pyarrow APIs.")

Copilot uses AI. Check for mistakes.
Comment on lines 264 to +267
from pyarrow.lib import (ChunkedArray, RecordBatch, Table, table,
concat_arrays, concat_tables, TableGroupBy,
RecordBatchReader, concat_batches)
RecordBatchReader, AsyncRecordBatchReader,
concat_batches)
Copy link

Copilot AI Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description mentions a "new API to produce an async device stream", but the Python surface added here only exports the AsyncRecordBatchReader type; there is no public factory/function that returns an instance (search shows no other AsyncRecordBatchReader references besides ipc_async.pxi and this import). If the user-facing entrypoint is intended to be part of this PR, it appears to be missing.

Copilot uses AI. Check for mistakes.
@github-actions github-actions bot added awaiting committer review Awaiting committer review and removed awaiting review Awaiting review labels Apr 7, 2026
@CurtHagenlocher CurtHagenlocher marked this pull request as draft April 7, 2026 23:35
@CurtHagenlocher
Copy link
Copy Markdown
Contributor Author

Heh... clearly I understood the code even less than I'd thought...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants